Adding your custom environments to Coach will allow you to solve your own tasks using any of the predefined algorithms. There are two ways for adding your own environment to Coach:
In this tutorial, we'll follow the 2nd option, and add the DeepMind Control Suite environment to Coach. We will then create a preset that trains a DDPG agent on one of the levels of the new environment.
First, we will need to install the DeepMind Control Suite library. To do this, follow the installation instructions here: https://github.com/deepmind/dm_control#installation-and-requirements.
Make sure your LD_LIBRARY_PATH
contains the path to the GLEW and LGFW libraries (https://github.com/openai/mujoco-py/issues/110).
In addition, Mujoco rendering might need to be disabled (https://github.com/deepmind/dm_control/issues/20)
To integrate an environment with Coach, we need to implement an environment wrapper. Coach has several predefined environment wrappers which are placed under the environments folder, but we can place our new environment wherever we want and reference it later.
Now let's define the control suite's environment wrapper class.
In the __init__
function we'll load and initialize the simulator using the level given by self.env_id
.
Additionally, we will define the state space and action space of the environment, through the self.state_space
and self.action_space
members.
In this case, the state space is a dictionary consisting of 2 observations:
In [ ]:
import numpy as np
import random
from typing import Union
from dm_control import suite
from dm_control.suite.wrappers import pixels
from rl_coach.base_parameters import VisualizationParameters
from rl_coach.spaces import BoxActionSpace, ImageObservationSpace, VectorObservationSpace, StateSpace
from rl_coach.environments.environment import Environment, LevelSelection
# Environment
class ControlSuiteEnvironment(Environment):
def __init__(self, level: LevelSelection, frame_skip: int, visualization_parameters: VisualizationParameters,
seed: Union[None, int]=None, human_control: bool=False,
custom_reward_threshold: Union[int, float]=None, **kwargs):
super().__init__(level, seed, frame_skip, human_control, custom_reward_threshold, visualization_parameters)
# load and initialize environment
domain_name, task_name = self.env_id.split(":")
self.env = suite.load(domain_name=domain_name, task_name=task_name)
self.env = pixels.Wrapper(self.env, pixels_only=False)
# seed
if self.seed is not None:
np.random.seed(self.seed)
random.seed(self.seed)
self.state_space = StateSpace({})
# image observations
self.state_space['pixels'] = ImageObservationSpace(shape=self.env.observation_spec()['pixels'].shape,
high=255)
# measurements observations
measurements_space_size = 0
measurements_names = []
for observation_space_name, observation_space in self.env.observation_spec().items():
if len(observation_space.shape) == 0:
measurements_space_size += 1
measurements_names.append(observation_space_name)
elif len(observation_space.shape) == 1:
measurements_space_size += observation_space.shape[0]
measurements_names.extend(["{}_{}".format(observation_space_name, i) for i in
range(observation_space.shape[0])])
self.state_space['measurements'] = VectorObservationSpace(shape=measurements_space_size,
measurements_names=measurements_names)
# actions
self.action_space = BoxActionSpace(
shape=self.env.action_spec().shape[0],
low=self.env.action_spec().minimum,
high=self.env.action_spec().maximum
)
# initialize the state by getting a new state from the environment
self.reset_internal_state(True)
The following functions cover the API expected from a new environment wrapper:
_update_state
- update the internal state of the wrapper (to be queried by the agent),
which consists of:
self.state
- a dictionary containing all the observations from the environment and which follows the state space definition.self.reward
- a float value containing the reward for the last step of the environmentself.done
- a boolean flag which signals if the environment episode has endedself.goal
- a numpy array representing the goal the environment has set for the last stepself.info
- a dictionary that contains any additional information for the last step_take_action
- gets the action from the agent, and make a single step on the environment
_restart_environment_episode
- restart the environment on a new episode get_rendered_image
- get a rendered image of the environment in its current state
In [ ]:
class ControlSuiteEnvironment(Environment):
def _update_state(self):
self.state = {}
self.pixels = self.last_result.observation['pixels']
self.state['pixels'] = self.pixels
self.measurements = np.array([])
for sub_observation in self.last_result.observation.values():
if isinstance(sub_observation, np.ndarray) and len(sub_observation.shape) == 1:
self.measurements = np.concatenate((self.measurements, sub_observation))
else:
self.measurements = np.concatenate((self.measurements, np.array([sub_observation])))
self.state['measurements'] = self.measurements
self.reward = self.last_result.reward if self.last_result.reward is not None else 0
self.done = self.last_result.last()
def _take_action(self, action):
if type(self.action_space) == BoxActionSpace:
action = self.action_space.clip_action_to_space(action)
self.last_result = self.env.step(action)
def _restart_environment_episode(self, force_environment_reset=False):
self.last_result = self.env.reset()
def get_rendered_image(self):
return self.env.physics.render(camera_id=0)
Finally, we will need to define a parameters class corresponding to our environment class.
In [ ]:
from rl_coach.environments.environment import EnvironmentParameters
from rl_coach.filters.filter import NoInputFilter, NoOutputFilter
# Parameters
class ControlSuiteEnvironmentParameters(EnvironmentParameters):
def __init__(self):
super().__init__()
self.default_input_filter = NoInputFilter()
self.default_output_filter = NoOutputFilter()
@property
def path(self):
return 'environments.control_suite_environment:ControlSuiteEnvironment'
Now that we have our new environment, we will want to use one of the predefined algorithms to try and solve it.
In this case, since the environment defines a continuous action space, we will want to use a supporting algorithm, so we will select DDPG. To run DDPG on the environment, we will need to define a preset for it.
The new preset will typically be defined in a new file - presets\ControlSuite_DDPG.py
.
First - let's define the agent parameters. We can use the default parameters for the DDPG agent, except that we need to update the networks input embedders to point to the correct environment observation. When we defined the environment, we set it to have 2 observations - 'pixels' and 'measurements'. In this case, we will want to learn only from the measurements, so we will need to modify the default input embedders to point to 'measurements' instead of the default 'observation' defined in DDPGAgentParameters
.
In [ ]:
from rl_coach.agents.ddpg_agent import DDPGAgentParameters
agent_params = DDPGAgentParameters()
# rename the input embedder key from 'observation' to 'measurements'
agent_params.network_wrappers['actor'].input_embedders_parameters['measurements'] = \
agent_params.network_wrappers['actor'].input_embedders_parameters.pop('observation')
agent_params.network_wrappers['critic'].input_embedders_parameters['measurements'] = \
agent_params.network_wrappers['critic'].input_embedders_parameters.pop('observation')
Now let's define the environment parameters. The DeepMind Control Suite environment has many levels to select from. The level can be selected either as a specific level name, for example 'cartpole:swingup', or by a list of level names from which a single level should be selected. The later can be done using the SingleLevelSelection
class, and then the level can be selected from the command line using the -lvl
flag.
In [ ]:
from rl_coach.environments.control_suite_environment import ControlSuiteEnvironmentParameters, control_suite_envs
from rl_coach.environments.environment import SingleLevelSelection
env_params = ControlSuiteEnvironmentParameters(level='cartpole:balance')
We will also need to define a schedule for the training. The schedule defines the number of steps we want run our experiment for and when to evaluate the trained model. In this case, we will use a simple predefined schedule, and just add some heatup steps to fill up the agent memory buffers with initial data.
In [ ]:
from rl_coach.graph_managers.graph_manager import SimpleSchedule
from rl_coach.core_types import EnvironmentSteps
schedule_params = SimpleSchedule()
schedule_params.heatup_steps = EnvironmentSteps(1000)
We will also want to see the simulator in action (otherwise we will miss all the fun), so let's set the render
flag to True in the visualization parameters
In [ ]:
from rl_coach.base_parameters import VisualizationParameters
vis_params = VisualizationParameters(render=True)
Finally, we'll create and run the graph manager
In [ ]:
from rl_coach.graph_managers.basic_rl_graph_manager import BasicRLGraphManager
graph_manager = BasicRLGraphManager(agent_params=agent_params, env_params=env_params,
schedule_params=schedule_params, vis_params=vis_params)
# let the adventure begin
graph_manager.improve()